package org.codehaus.activemq.message.util;

import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;

public class MemoryBoundedQueue
  implements BoundedPacketQueue
{
  private MemoryBoundedQueueManager queueManager;
  private String name;
  private boolean stopped = false;
  private boolean closed = false;
  private long memoryUsedByThisQueue;
  private Object outLock = new Object();
  private Object inLock = new Object();
  private LinkedList internalList = new LinkedList();
  private static final int WAIT_TIMEOUT = 100;
  private static final Log log = LogFactory.getLog(MemoryBoundedQueueManager.class);

  MemoryBoundedQueue(String name, MemoryBoundedQueueManager manager)
  {
    this.name = name;
    this.queueManager = manager;
  }

  public String getName()
  {
    return this.name;
  }

  public String toString()
  {
    return "" + this.name + " , cardinality = " + size() + " memory usage = " + this.memoryUsedByThisQueue;
  }

  public int size()
  {
    return this.internalList.size();
  }

  public long getLocalMemoryUsedByThisQueue()
  {
    return this.memoryUsedByThisQueue;
  }

  public void close()
  {
    try
    {
      clear();
      this.closed = true;
      synchronized (this.outLock) {
        this.outLock.notifyAll();
      }
      synchronized (this.inLock) {
        this.inLock.notifyAll();
      }
    }
    catch (Throwable e) {
      e.printStackTrace();
    }
    finally {
      this.queueManager.removeMemoryBoundedQueue(getName());
    }
  }

  public void enqueueNoBlock(Packet packet)
  {
    if (!this.closed)
      synchronized (this.outLock) {
        this.internalList.add(packet);
        incrementMemoryUsed(packet);
        this.outLock.notify();
      }
  }

  public void enqueue(Packet packet)
  {
    if (!this.queueManager.isFull()) {
      enqueueNoBlock(packet);
    }
    else {
      synchronized (this.inLock) {
        try {
          while ((this.queueManager.isFull()) && (!this.closed))
            this.inLock.wait(100L);
        }
        catch (InterruptedException ie)
        {
        }
      }
      enqueueNoBlock(packet);
    }
  }

  public final void enqueueFirstNoBlock(Packet packet)
  {
    if (!this.closed)
      synchronized (this.outLock) {
        this.internalList.addFirst(packet);
        incrementMemoryUsed(packet);
        this.outLock.notify();
      }
  }

  public void enqueueFirst(Packet packet)
    throws InterruptedException
  {
    if (!this.queueManager.isFull()) {
      enqueueFirstNoBlock(packet);
    }
    else {
      synchronized (this.inLock) {
        while ((this.queueManager.isFull()) && (!this.closed)) {
          this.inLock.wait(100L);
        }
      }
      enqueueFirstNoBlock(packet);
    }
  }

  public Packet dequeue()
    throws InterruptedException
  {
    Packet result = null;
    synchronized (this.outLock) {
      while ((this.internalList.isEmpty()) && (!this.closed)) {
        this.outLock.wait(100L);
      }
      result = dequeueNoWait();
    }
    return result;
  }

  public Packet dequeue(long timeInMillis)
    throws InterruptedException
  {
    if (timeInMillis == 0L) {
      return dequeue();
    }

    synchronized (this.outLock)
    {
      if ((timeInMillis >= 0L) && 
        (this.internalList.isEmpty()) && (!this.closed)) {
        this.outLock.wait(timeInMillis);
      }

      return dequeueNoWait();
    }
  }

  public Packet dequeueNoWait()
    throws InterruptedException
  {
    Packet packet = null;
    if (this.stopped) {
      synchronized (this.outLock) {
        while ((this.stopped) && (!this.closed)) {
          this.outLock.wait(100L);
        }
      }
    }
    synchronized (this.outLock) {
      if (!this.internalList.isEmpty()) {
        packet = (Packet)this.internalList.removeFirst();
        decrementMemoryUsed(packet);
      }
    }
    return packet;
  }

  public boolean isStarted()
  {
    return !this.stopped;
  }

  public void stop()
  {
    synchronized (this.outLock) {
      this.stopped = true;
    }
  }

  public void start()
  {
    this.stopped = false;
    synchronized (this.outLock) {
      this.outLock.notifyAll();
    }
    synchronized (this.inLock) {
      this.inLock.notifyAll();
    }
  }

  public boolean remove(Packet packet)
  {
    boolean result = false;
    synchronized (this.inLock) {
      if (!this.internalList.isEmpty()) {
        result = this.internalList.remove(packet);
      }
      if (result) {
        decrementMemoryUsed(packet);
        if (!this.queueManager.isFull()) {
          this.inLock.notify();
        }
      }
    }
    return result;
  }

  public void clear()
  {
    synchronized (this.inLock) {
      while (!this.internalList.isEmpty()) {
        Packet packet = (Packet)this.internalList.removeFirst();
        decrementMemoryUsed(packet);
      }
      this.inLock.notify();
    }
  }

  public boolean isEmpty()
  {
    return this.internalList.isEmpty();
  }

  public Packet get(int index)
  {
    return (Packet)this.internalList.get(index);
  }

  public List getContents()
  {
    return (List)this.internalList.clone();
  }

  private void incrementMemoryUsed(Packet packet) {
    if (packet != null)
      this.memoryUsedByThisQueue += this.queueManager.incrementMemoryUsed(packet);
  }

  private void decrementMemoryUsed(Packet packet)
  {
    if (packet != null)
      this.memoryUsedByThisQueue -= this.queueManager.decrementMemoryUsed(packet);
  }
}